Skip to main content

실시간 로그적재 배치 개발

· 8 min read
Eundo Park
Maintainer

최근 프로젝트에서 1분에 한 번씩 실행되는 배치 작업을 개발하여, 실시간으로 쌓이는 대용량 로그 파일을 읽어와 데이터베이스에 적재하는 기능을 구현했습니다. 이 배치는 대용량 데이터를 빠르게 처리해야 했기 때문에 멀티쓰레드 병렬처리를 적용하여 성능을 최적화했습니다.

개발 주요작업

  • Spring Batch를 이용하여 배치 작업을 구성하고, 파티셔닝과 TaskExecutor를 활용하여 멀티쓰레드 환경에서 로그 파일을 병렬로 처리했습니다. 이를 통해 대량의 데이터를 효율적으로 분산 처리하여, 성능을 최적화했습니다.
@Bean("logProcessingJob")
public Job logProcessingJob(JobRepository jobRepository, @Qualifier("partitionedLogProcessingStep") Step partitionedLogProcessingStep, @Qualifier("moveFilesStep") Step moveFilesStep) {
return new JobBuilder(BatchJobNames.LOG_PROCESSING_BATCH.name(), jobRepository)
.listener(jobExecutionListener())
.start(partitionedLogProcessingStep)
.next(moveFilesStep)
.build();
}
  • JSON 형태로 저장된 로그 데이터를 파싱하고, 이를 데이터베이스에 적재하기 위해 컨버팅 작업을 수행했습니다. 이를 통해 실시간으로 수집된 로그 데이터를 효율적으로 처리하고 저장할 수 있었습니다.
@Override
public LogEntry process(JsonNode jsonNode) {
LogEntry logEntry = new LogEntry();
logEntry.setTimestamp(jsonNode.get("timestamp").asText());
logEntry.setLevel(jsonNode.get("level").asText());
logEntry.setMessage(jsonNode.get("message").asText());
// 기타 필요한 필드 세팅
return logEntry;
}
  • 배치 작업 중 마지막으로 읽은 파일과 위치를 기억하는 로직을 구현하여, 배치가 재실행될 때 이전에 처리한 위치부터 이어서 작업할 수 있도록 했습니다. 이를 통해 데이터의 중복 처리를 방지하고, 배치 작업의 안정성을 높였습니다.
@PostConstruct
public void initialize() throws IOException {
log.debug("Initializing reader for file: {}, isLastFile: {}", filePath, isLastFile);
if (filePath != null) {
File file = new File(filePath);
randomAccessFile = new RandomAccessFile(file, "r");
long startPosition = batchState.getFilePosition(filePath);
randomAccessFile.seek(startPosition);
log.debug("File opened: {}, starting from position: {}", filePath, startPosition);
}
}

@Override
public JsonNode read() throws Exception {
if (randomAccessFile == null || fileFullyRead) {
return null;
}

JsonNode jsonNode = readNextJsonNode();
if (jsonNode != null) {
long currentPosition = randomAccessFile.getFilePointer();
batchState.updateFilePosition(filePath, currentPosition);
log.debug("Read JSON node from file: {}, new position: {}", filePath, currentPosition);
return jsonNode;
} else {
fileFullyRead = true;
batchState.addProcessedFile(filePath);
log.debug("Reached end of file: {}", filePath);
return null;
}
}
  • 배치 작업이 종료된 후 로그 파일을 이동시키는 후처리 작업을 구현하여, 중복 처리를 방지하고 로그 관리의 일관성을 유지했습니다.
@Bean
public Step moveFilesStep() {
return stepBuilderFactory.get("moveFilesStep")
.tasklet(logFileMover)
.build();
}

문제 해결

  • 파티셔닝과 TaskExecutor 사용 중 발생한 문제: 처음에는 파티셔닝을 사용해 로그 파일을 Chunk 단위로 병렬 처리하는 과정에서, Reader가 파일에서 한 건만 읽고 나머지 데이터는 읽지 못하는 문제가 발생했습니다. 이 문제는 Reader가 제대로 설정되지 않아 발생한 것이었고, Reader의 설정을 수정하여 문제를 해결했습니다.
@Override
public JsonNode read() throws Exception {
if (randomAccessFile == null || fileFullyRead) {
return null;
}
JsonNode jsonNode = readNextJsonNode();
if (jsonNode != null) {
long currentPosition = randomAccessFile.getFilePointer();
batchState.updateFilePosition(filePath, currentPosition);
log.debug("Read JSON node from file: {}, new position: {}", filePath, currentPosition);
return jsonNode;
} else {
fileFullyRead = true;
batchState.addProcessedFile(filePath);
log.debug("Reached end of file: {}", filePath);
return null;
}
}
  • 트랜잭션 문제: 배치를 Quartz Scheduler와 연동하여 실행하는 과정에서, PlatformTransactionManager를 사용했을 때 트랜잭션이 제대로 동작하지 않아 커밋이 되지 않는 문제가 있었습니다. 이를 해결하기 위해 JPATransactionManager로 변경했으며, 이후 트랜잭션이 정상적으로 작동하여 문제를 해결할 수 있었습니다.
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory); // JPATransactionManager로 변경하여 문제 해결
}

파티셔닝과 병렬처리

  • 데이터 일관성: 파티셔닝과 병렬처리를 적용하면서 가장 주의해야 했던 부분은 데이터의 일관성 유지였습니다. 여러 스레드가 동시에 데이터를 처리하기 때문에, 각 스레드가 중복된 데이터를 처리하거나 동일한 리소스에 동시 접근하여 충돌이 발생하지 않도록 신경을 써야 했습니다.
  • Reader와 Writer의 동기화: 병렬 처리를 적용할 때 Reader가 모든 데이터를 정상적으로 읽고, Writer가 중복 없이 데이터를 처리할 수 있도록 Reader와 Writer 간의 동기화 문제를 해결해야 했습니다. 이 과정에서 Reader의 상태를 정확히 관리하여 파일의 모든 데이터를 빠짐없이 처리할 수 있도록 했습니다.
  • Chunk 크기 조정: 병렬 처리의 성능을 최적화하기 위해 Chunk의 크기를 적절히 조정하는 것도 중요했습니다. 너무 큰 Chunk는 병렬 처리의 이점을 살리지 못하게 하고, 너무 작은 Chunk는 오버헤드를 증가시킬 수 있기 때문에, 적절한 크기를 설정하여 성능을 극대화했습니다.

주요 성과

  • 성능 최적화: 파티셔닝과 TaskExecutor를 활용한 멀티쓰레드 병렬 처리로 대용량 로그 데이터를 신속하게 처리하고, 데이터베이스 적재 속도를 극대화했습니다.
  • 데이터 일관성 유지: 복잡한 JSON 로그 데이터를 정확하게 파싱하여, 데이터 무결성을 유지하며 안정적으로 데이터베이스에 저장했습니다.
  • 안정성 강화: 오류 발생 시에도 시스템이 안정적으로 동작하도록 예외 처리 로직을 강화하여, 데이터 손실 및 중단을 방지했습니다.
  • 연속성 보장: 마지막으로 읽은 파일과 위치를 기억하는 로직을 통해 배치 작업의 연속성을 보장하고, 재실행 시 중복 처리를 방지했습니다.
  • 트랜잭션 문제 해결: JPATransactionManager를 도입해 Quartz Scheduler와의 연동 시 트랜잭션 처리의 안정성을 확보했습니다.

이번 프로젝트를 통해 실시간 데이터 처리의 중요성을 깊이 이해하게 되었으며, 대용량 데이터를 다루는 시스템에서의 성능 최적화와 안정성 확보에 대한 경험을 쌓을 수 있었습니다.